package fun.easycode.datastream.repository;

import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;

/**
* <p>
* 数据任务 Repository
* </p>
*
* @author xuzhen97
* @since 2023-05-04
*/
@DS("data-stream")
public class TaskRepository extends ServiceImpl<TaskMapper, Task> {

    /**
     * 等待任务, 代表任务进入消息队列
     * @param taskId 任务id
     * @return 是否成功
     */
    public boolean waitTask(String taskId){
        LambdaUpdateWrapper<Task> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(Task::getId, taskId);
        updateWrapper.set(Task::getState, TaskState.WAIT);
        return update(updateWrapper);
    }

    /**
     * 开始任务
     * @param taskId 任务id
     * @return 是否成功
     */
    public boolean startTask(String taskId){

        long exeStartMillis = System.currentTimeMillis();

        LambdaUpdateWrapper<Task> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(Task::getId, taskId);
        updateWrapper.set(Task::getExeStartMillis, exeStartMillis);
        updateWrapper.set(Task::getState, TaskState.EXECUTING);
        return update(updateWrapper);
    }

    /**
     * 完成任务
     * @param taskId 任务id
     * @param totalSize 总大小
     * @return 是否成功
     */
    public boolean finishTask(String taskId, long totalSize){

        Task task = getById(taskId);

        // 任务不存在
        if(task == null){
            return false;
        }

        // 计算任务从生成到消费使用的总时长s
        long exeEndMillis = System.currentTimeMillis();
        int executeMillis =(int) ((exeEndMillis - task.getExeStartMillis())/ 1000);

        LambdaUpdateWrapper<Task> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(Task::getId, task.getId());
        updateWrapper.set(Task::getState, TaskState.SUCCESS);
        updateWrapper.set(Task::getExecutionTime, executeMillis);
        // 主要是全量的时候需要更新，魔幻点先实现
        updateWrapper.set(Task::getTotalSize, totalSize);
        return update(updateWrapper);
    }

    /**
     * 任务失败
     * @param taskId 任务id
     * @param errorMsg 错误信息
     * @return 是否成功
     */
    public boolean failTask(String taskId, String errorMsg){
        LambdaUpdateWrapper<Task> updateWrapper = new LambdaUpdateWrapper<>();
        updateWrapper.eq(Task::getId, taskId);
        updateWrapper.set(Task::getState, TaskState.EXEC_FAILURE);
        updateWrapper.set(Task::getLastErrorMsg, errorMsg);
        return update(updateWrapper);
    }
}
